gRPC 服务间调用负载均衡方案
业务场景介绍
假设我们有一个电商微服务系统,包含以下核心服务:
- 用户服务(User Service):处理用户注册、登录、信息管理
- 订单服务(Order Service):处理订单创建、查询、状态更新
- 库存服务(Inventory Service):管理商品库存
- 支付服务(Payment Service):处理支付逻辑
在这个场景中,当用户下单时,订单服务需要同时调用库存服务检查库存、调用用户服务验证用户信息、调用支付服务处理支付。每个服务都有多个实例,如何选择调用哪个实例就是我们要解决的负载均衡问题。
方案一:Kubernetes 原生服务发现 + DNS 负载均衡
场景应用:Kubernetes 环境下最简单直接的方案
Kubernetes 通过 Service 资源提供内置的服务发现和负载均衡,DNS 解析会返回所有健康的 Pod IP。
Kubernetes Service 配置:
# inventory-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: inventory-service
spec:
replicas: 3
selector:
matchLabels:
app: inventory-service
template:
metadata:
labels:
app: inventory-service
spec:
containers:
- name: inventory-service
image: inventory-service:v1.0
ports:
- containerPort: 8080
name: grpc
env:
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:8080"]
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:8080"]
initialDelaySeconds: 15
periodSeconds: 20
---
apiVersion: v1
kind: Service
metadata:
name: inventory-service
spec:
selector:
app: inventory-service
ports:
- name: grpc
port: 8080
targetPort: 8080
protocol: TCP
type: ClusterIP
sessionAffinity: None # 关闭会话保持,启用负载均衡
关闭这个会话保持后,它
订单服务调用代码:
// 订单服务通过 Kubernetes Service 调用库存服务
func (s *OrderService) CreateOrder(ctx context.Context, req *pb.CreateOrderRequest) (*pb.CreateOrderResponse, error) {
// 直接使用 Kubernetes Service 名称
// kube-proxy 会自动进行负载均衡
conn, err := grpc.Dial("inventory-service:8080",
grpc.WithInsecure(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
)
if err != nil {
return nil, fmt.Errorf("连接库存服务失败: %v", err)
}
defer conn.Close()
inventoryClient := inventory_pb.NewInventoryServiceClient(conn)
// Kubernetes 自动负载均衡到可用的 Pod
inventoryResp, err := inventoryClient.CheckInventory(ctx, &inventory_pb.CheckInventoryRequest{
ProductId: req.ProductId,
Quantity: req.Quantity,
})
if err != nil {
return nil, fmt.Errorf("库存检查失败: %v", err)
}
return processOrder(inventoryResp, req)
}
// 连接池优化
type K8sGRPCClient struct {
connections sync.Map // 服务名 -> *grpc.ClientConn
mutex sync.Mutex
}
func (k *K8sGRPCClient) GetConnection(serviceName string) (*grpc.ClientConn, error) {
if conn, ok := k.connections.Load(serviceName); ok {
return conn.(*grpc.ClientConn), nil
}
k.mutex.Lock()
defer k.mutex.Unlock()
// 双重检查
if conn, ok := k.connections.Load(serviceName); ok {
return conn.(*grpc.ClientConn), nil
}
// 创建新连接
conn, err := grpc.Dial(fmt.Sprintf("%s:8080", serviceName),
grpc.WithInsecure(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second,
Timeout: 5 * time.Second,
}),
)
if err != nil {
return nil, err
}
k.connections.Store(serviceName, conn)
return conn, nil
}
K8s 负载均衡算法配置:
# 使用 IPVS 模式获得更好的性能
apiVersion: v1
kind: ConfigMap
metadata:
name: kube-proxy-config
namespace: kube-system
data:
config.conf: |
apiVersion: kubeproxy.config.k8s.io/v1alpha1
kind: KubeProxyConfiguration
mode: "ipvs"
ipvs:
scheduler: "rr" # round-robin
# 其他算法: lc (least connection), dh (destination hashing)
clusterCIDR: "10.244.0.0/16"
优势:
- 零配置:无需额外的服务发现组件
- 原生集成:完全利用 Kubernetes 生态
- 自动健康检查:基于 Pod readiness/liveness 探针
- 高可用:kube-proxy 在每个节点运行,无单点故障
适用场景:
- Kubernetes 原生环境
- 简单的负载均衡需求
- 不需要复杂路由规则的应用
补充:会话保持 vs 负载均衡机制
sessionAffinity 的含义
# 当前配置
sessionAffinity: None # 关闭会话保持
# 对比:启用会话保持
sessionAffinity: ClientIP # 基于客户端IP的会话保持
sessionAffinityConfig:
clientIP:
timeoutSeconds: 10800 # 3小时
注意 sessionAffinity 只影响"连接建立时"的路由决策,这个设置不影响连接保活
会话关闭后的连接行为
连接层面 vs 请求层面
// 客户端代码示例
func main() {
// 建立长连接到 Service VIP
conn, err := grpc.Dial("inventory-service:8080",
grpc.WithInsecure(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
)
defer conn.Close()
client := pb.NewInventoryServiceClient(conn)
// 多次请求使用同一连接
for i := 0; i < 100; i++ {
// 每个请求可能被路由到不同的 Pod
resp, err := client.GetInventory(ctx, &pb.GetInventoryRequest{
ProductId: fmt.Sprintf("product-%d", i),
})
time.Sleep(1 * time.Second)
}
}
方案二:Headless Service + gRPC 客户端负载均衡
场景应用:需要客户端直连 Pod,获得最佳性能
Headless Service 不分配 ClusterIP,DNS 直接返回所有 Pod IP,客户端可以实现自定义负载均衡。
Headless Service 配置:
# headless-inventory-service.yaml
apiVersion: v1
kind: Service
metadata:
name: inventory-service-headless
spec:
clusterIP: None # Headless Service
selector:
app: inventory-service
ports:
- name: grpc
port: 8080
targetPort: 8080
protocol: TCP
---
# 为了服务发现,还需要配置 Endpoints
apiVersion: v1
kind: Endpoints
metadata:
name: inventory-service-headless
subsets:
- addresses:
- ip: 10.244.1.10
targetRef:
kind: Pod
name: inventory-service-pod-1
- ip: 10.244.1.11
targetRef:
kind: Pod
name: inventory-service-pod-2
ports:
- name: grpc
port: 8080
protocol: TCP
自定义 gRPC 解析器实现:
// Kubernetes Headless Service 解析器
type K8sResolver struct {
target resolver.Target
cc resolver.ClientConn
serviceName string
namespace string
client kubernetes.Interface
ctx context.Context
cancel context.CancelFunc
}
func (r *K8sResolver) ResolveNow(resolver.ResolveNowOptions) {
go r.resolve()
}
func (r *K8sResolver) resolve() {
// 获取 Endpoints
endpoints, err := r.client.CoreV1().Endpoints(r.namespace).Get(
context.TODO(), r.serviceName, metav1.GetOptions{})
if err != nil {
log.Printf("获取 Endpoints 失败: %v", err)
return
}
var addresses []resolver.Address
for _, subset := range endpoints.Subsets {
for _, addr := range subset.Addresses {
for _, port := range subset.Ports {
if port.Name == "grpc" {
grpcAddr := fmt.Sprintf("%s:%d", addr.IP, port.Port)
addresses = append(addresses, resolver.Address{
Addr: grpcAddr,
Metadata: map[string]interface{}{
"pod_name": addr.TargetRef.Name,
"zone": addr.NodeName,
},
})
}
}
}
}
// 更新地址列表
r.cc.UpdateState(resolver.State{Addresses: addresses})
}
func (r *K8sResolver) watch() {
watchlist := cache.NewListWatchFromClient(
r.client.CoreV1().RESTClient(),
"endpoints",
r.namespace,
fields.OneTermEqualSelector("metadata.name", r.serviceName),
)
_, controller := cache.NewInformer(
watchlist,
&v1.Endpoints{},
time.Second*10,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { r.resolve() },
UpdateFunc: func(oldObj, newObj interface{}) { r.resolve() },
DeleteFunc: func(obj interface{}) { r.resolve() },
},
)
controller.Run(r.ctx.Done())
}
// 客户端负载均衡器
type PodAwareBalancer struct {
addresses []resolver.Address
healthMap sync.Map // podIP -> bool
responseTime sync.Map // podIP -> time.Duration
current int32
mutex sync.RWMutex
}
func (p *PodAwareBalancer) Pick() string {
p.mutex.RLock()
defer p.mutex.RUnlock()
healthyAddresses := p.getHealthyAddresses()
if len(healthyAddresses) == 0 {
return ""
}
// 基于响应时间的智能选择
var bestAddr string
var bestTime time.Duration = time.Hour
for _, addr := range healthyAddresses {
if respTime, ok := p.responseTime.Load(addr.Addr); ok {
if rt := respTime.(time.Duration); rt < bestTime {
bestTime = rt
bestAddr = addr.Addr
}
}
}
if bestAddr == "" {
// 如果没有响应时间数据,使用轮询
idx := atomic.AddInt32(&p.current, 1) % int32(len(healthyAddresses))
bestAddr = healthyAddresses[idx].Addr
}
return bestAddr
}
func (p *PodAwareBalancer) getHealthyAddresses() []resolver.Address {
var healthy []resolver.Address
for _, addr := range p.addresses {
if isHealthy, ok := p.healthMap.Load(addr.Addr); !ok || isHealthy.(bool) {
healthy = append(healthy, addr)
}
}
return healthy
}
// 健康检查实现
func (p *PodAwareBalancer) startHealthCheck() {
ticker := time.NewTicker(10 * time.Second)
go func() {
for range ticker.C {
for _, addr := range p.addresses {
go p.checkPodHealth(addr.Addr)
}
}
}()
}
func (p *PodAwareBalancer) checkPodHealth(address string) {
start := time.Now()
conn, err := grpc.Dial(address,
grpc.WithInsecure(),
grpc.WithTimeout(3*time.Second),
)
if err != nil {
p.healthMap.Store(address, false)
return
}
defer conn.Close()
client := grpc_health_v1.NewHealthClient(conn)
_, err = client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{})
duration := time.Since(start)
if err != nil {
p.healthMap.Store(address, false)
} else {
p.healthMap.Store(address, true)
p.responseTime.Store(address, duration)
}
}
订单服务使用示例:
func NewInventoryClient() (*InventoryClient, error) {
// 注册自定义解析器
resolver.Register(&K8sResolverBuilder{})
conn, err := grpc.Dial(
"k8s://inventory-service-headless.default.svc.cluster.local",
grpc.WithInsecure(),
grpc.WithBalancerName("pod_aware_balancer"),
)
if err != nil {
return nil, err
}
return &InventoryClient{
conn: conn,
client: inventory_pb.NewInventoryServiceClient(conn),
}, nil
}
优势:
- 直连性能:客户端直接连接 Pod,无额外跳跃
- 智能感知:可以感知 Pod 健康状态和响应时间
- 灵活定制:可以实现复杂的负载均衡算法
- 故障快速切换:客户端直接检测 Pod 状态
适用场景:
- 对性能要求极高的应用
- 需要自定义负载均衡策略
- 长连接场景
方案三:ETCD 服务发现 + gRPC 负载均衡
场景应用:跨 Kubernetes 集群,或需要更灵活服务发现的场景
ETCD 作为分布式键值存储,可以实现跨集群的服务发现和配置管理。
ETCD 服务注册实现:
// ETCD 服务注册器
type ETCDRegistry struct {
client *clientv3.Client
leaseID clientv3.LeaseID
keepAlive <-chan *clientv3.LeaseKeepAliveResponse
key string
value string
}
func NewETCDRegistry(endpoints []string, serviceName, instanceAddr string) (*ETCDRegistry, error) {
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, err
}
// 创建租约
lease, err := client.Grant(context.Background(), 30) // 30秒TTL
if err != nil {
return nil, err
}
// 服务实例信息
instanceInfo := ServiceInstance{
ID: fmt.Sprintf("%s-%s", serviceName, instanceAddr),
Name: serviceName,
Address: instanceAddr,
Port: 8080,
Metadata: map[string]string{
"version": "v1.0",
"zone": os.Getenv("ZONE"),
"weight": "100",
"protocol": "grpc",
"register_time": time.Now().Format(time.RFC3339),
},
}
value, _ := json.Marshal(instanceInfo)
key := fmt.Sprintf("/services/%s/%s", serviceName, instanceInfo.ID)
registry := &ETCDRegistry{
client: client,
leaseID: lease.ID,
key: key,
value: string(value),
}
return registry, nil
}
func (r *ETCDRegistry) Register() error {
// 注册服务
_, err := r.client.Put(context.Background(), r.key, r.value, clientv3.WithLease(r.leaseID))
if err != nil {
return err
}
// 启动心跳
r.keepAlive, err = r.client.KeepAlive(context.Background(), r.leaseID)
if err != nil {
return err
}
go r.watchKeepAlive()
return nil
}
func (r *ETCDRegistry) watchKeepAlive() {
for resp := range r.keepAlive {
if resp == nil {
log.Println("ETCD 心跳失败,尝试重新注册")
r.reRegister()
break
}
log.Printf("ETCD 心跳成功,TTL: %d", resp.TTL)
}
}
func (r *ETCDRegistry) reRegister() {
// 重新创建租约并注册
lease, err := r.client.Grant(context.Background(), 30)
if err != nil {
log.Printf("重新创建租约失败: %v", err)
return
}
r.leaseID = lease.ID
if err := r.Register(); err != nil {
log.Printf("重新注册失败: %v", err)
}
}
func (r *ETCDRegistry) Deregister() error {
// 撤销租约,自动删除服务信息
_, err := r.client.Revoke(context.Background(), r.leaseID)
return err
}
type ServiceInstance struct {
ID string `json:"id"`
Name string `json:"name"`
Address string `json:"address"`
Port int `json:"port"`
Metadata map[string]string `json:"metadata"`
}
ETCD 服务发现实现:
// ETCD 服务发现器
type ETCDDiscovery struct {
client *clientv3.Client
serviceName string
instances sync.Map // instanceID -> ServiceInstance
balancer LoadBalancer
watchCtx context.Context
watchCancel context.CancelFunc
}
func NewETCDDiscovery(endpoints []string, serviceName string) (*ETCDDiscovery, error) {
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
discovery := &ETCDDiscovery{
client: client,
serviceName: serviceName,
balancer: NewConsistentHashBalancer(),
watchCtx: ctx,
watchCancel: cancel,
}
// 初始化加载服务实例
if err := discovery.loadInstances(); err != nil {
return nil, err
}
// 开始监听变化
go discovery.watchChanges()
return discovery, nil
}
func (d *ETCDDiscovery) loadInstances() error {
prefix := fmt.Sprintf("/services/%s/", d.serviceName)
resp, err := d.client.Get(context.Background(), prefix, clientv3.WithPrefix())
if err != nil {
return err
}
var instances []ServiceInstance
for _, kv := range resp.Kvs {
var instance ServiceInstance
if err := json.Unmarshal(kv.Value, &instance); err != nil {
log.Printf("解析服务实例失败: %v", err)
continue
}
d.instances.Store(instance.ID, instance)
instances = append(instances, instance)
}
d.balancer.UpdateInstances(instances)
log.Printf("加载 %s 服务实例 %d 个", d.serviceName, len(instances))
return nil
}
func (d *ETCDDiscovery) watchChanges() {
prefix := fmt.Sprintf("/services/%s/", d.serviceName)
watchChan := d.client.Watch(d.watchCtx, prefix, clientv3.WithPrefix())
for watchResp := range watchChan {
for _, event := range watchResp.Events {
switch event.Type {
case clientv3.EventTypePut:
var instance ServiceInstance
if err := json.Unmarshal(event.Kv.Value, &instance); err != nil {
log.Printf("解析新增服务实例失败: %v", err)
continue
}
d.instances.Store(instance.ID, instance)
d.updateBalancer()
log.Printf("新增服务实例: %s", instance.ID)
case clientv3.EventTypeDelete:
// 从key中提取instanceID
key := string(event.Kv.Key)
instanceID := key[strings.LastIndex(key, "/")+1:]
d.instances.Delete(instanceID)
d.updateBalancer()
log.Printf("删除服务实例: %s", instanceID)
}
}
}
}
func (d *ETCDDiscovery) updateBalancer() {
var instances []ServiceInstance
d.instances.Range(func(key, value interface{}) bool {
instances = append(instances, value.(ServiceInstance))
return true
})
d.balancer.UpdateInstances(instances)
}
func (d *ETCDDiscovery) GetInstance(key string) (*ServiceInstance, error) {
instanceID := d.balancer.Select(key)
if instanceID == "" {
return nil, errors.New("没有可用的服务实例")
}
if instance, ok := d.instances.Load(instanceID); ok {
inst := instance.(ServiceInstance)
return &inst, nil
}
return nil, errors.New("服务实例不存在")
}
一致性哈希负载均衡器:
// 一致性哈希负载均衡器
type ConsistentHashBalancer struct {
ring map[uint32]string // hash -> instanceID
sortedKeys []uint32
instances map[string]ServiceInstance
mutex sync.RWMutex
replicas int // 虚拟节点数量
}
func NewConsistentHashBalancer() *ConsistentHashBalancer {
return &ConsistentHashBalancer{
ring: make(map[uint32]string),
instances: make(map[string]ServiceInstance),
replicas: 100, // 每个实例创建100个虚拟节点
}
}
func (c *ConsistentHashBalancer) UpdateInstances(instances []ServiceInstance) {
c.mutex.Lock()
defer c.mutex.Unlock()
// 清空现有环
c.ring = make(map[uint32]string)
c.sortedKeys = []uint32{}
c.instances = make(map[string]ServiceInstance)
// 添加所有实例到哈希环
for _, instance := range instances {
c.instances[instance.ID] = instance
c.addToRing(instance.ID)
}
// 排序哈希值
sort.Slice(c.sortedKeys, func(i, j int) bool {
return c.sortedKeys[i] < c.sortedKeys[j]
})
}
func (c *ConsistentHashBalancer) addToRing(instanceID string) {
for i := 0; i < c.replicas; i++ {
key := fmt.Sprintf("%s-%d", instanceID, i)
hash := c.hash(key)
c.ring[hash] = instanceID
c.sortedKeys = append(c.sortedKeys, hash)
}
}
func (c *ConsistentHashBalancer) Select(key string) string {
c.mutex.RLock()
defer c.mutex.RUnlock()
if len(c.sortedKeys) == 0 {
return ""
}
hash := c.hash(key)
// 找到第一个大于等于该哈希值的节点
idx := sort.Search(len(c.sortedKeys), func(i int) bool {
return c.sortedKeys[i] >= hash
})
// 如果没找到,选择第一个节点(环形)
if idx == len(c.sortedKeys) {
idx = 0
}
return c.ring[c.sortedKeys[idx]]
}
func (c *ConsistentHashBalancer) hash(key string) uint32 {
h := fnv.New32a()
h.Write([]byte(key))
return h.Sum32()
}
订单服务使用 ETCD 发现:
type OrderService struct {
inventoryDiscovery *ETCDDiscovery
paymentDiscovery *ETCDDiscovery
userDiscovery *ETCDDiscovery
clientPool *GRPCClientPool
}
func NewOrderService() (*OrderService, error) {
etcdEndpoints := []string{"etcd-1:2379", "etcd-2:2379", "etcd-3:2379"}
inventoryDiscovery, err := NewETCDDiscovery(etcdEndpoints, "inventory-service")
if err != nil {
return nil, err
}
paymentDiscovery, err := NewETCDDiscovery(etcdEndpoints, "payment-service")
if err != nil {
return nil, err
}
return &OrderService{
inventoryDiscovery: inventoryDiscovery,
paymentDiscovery: paymentDiscovery,
clientPool: NewGRPCClientPool(),
}, nil
}
func (s *OrderService) CreateOrder(ctx context.Context, req *pb.CreateOrderRequest) (*pb.CreateOrderResponse, error) {
// 使用用户ID作为一致性哈希的key,确保同一用户的请求打到同一个实例
userKey := fmt.Sprintf("user-%s", req.UserId)
// 获取库存服务实例
inventoryInstance, err := s.inventoryDiscovery.GetInstance(req.ProductId)
if err != nil {
return nil, fmt.Errorf("获取库存服务实例失败: %v", err)
}
// 获取支付服务实例
paymentInstance, err := s.paymentDiscovery.GetInstance(userKey)
if err != nil {
return nil, fmt.Errorf("获取支付服务实例失败: %v", err)
}
// 并发调用多个服务
var wg sync.WaitGroup
var inventoryResp *inventory_pb.CheckInventoryResponse
var paymentResp *payment_pb.PreparePaymentResponse
var inventoryErr, paymentErr error
// 检查库存
wg.Add(1)
go func() {
defer wg.Done()
client, err := s.clientPool.GetInventoryClient(inventoryInstance)
if err != nil {
inventoryErr = err
return
}
inventoryResp, inventoryErr = client.CheckInventory(ctx, &inventory_pb.CheckInventoryRequest{
ProductId: req.ProductId,
Quantity: req.Quantity,
})
}()
// 准备支付
wg.Add(1)
go func() {
defer wg.Done()
client, err := s.clientPool.GetPaymentClient(paymentInstance)
if err != nil {
paymentErr = err
return
}
paymentResp, paymentErr = client.PreparePayment(ctx, &payment_pb.PreparePaymentRequest{
UserId: req.UserId,
Amount: req.Amount,
})
}()
wg.Wait()
if inventoryErr != nil {
return nil, fmt.Errorf("库存检查失败: %v", inventoryErr)
}
if paymentErr != nil {
return nil, fmt.Errorf("支付准备失败: %v", paymentErr)
}
return s.processOrderCreation(inventoryResp, paymentResp, req)
}
// gRPC 客户端连接池
type GRPCClientPool struct {
inventoryClients sync.Map // address -> InventoryServiceClient
paymentClients sync.Map // address -> PaymentServiceClient
connections sync.Map // address -> *grpc.ClientConn
}
func (p *GRPCClientPool) GetInventoryClient(instance *ServiceInstance) (inventory_pb.InventoryServiceClient, error) {
address := fmt.Sprintf("%s:%d", instance.Address, instance.Port)
if client, ok := p.inventoryClients.Load(address); ok {
return client.(inventory_pb.InventoryServiceClient), nil
}
conn, err := p.getConnection(address)
if err != nil {
return nil, err
}
client := inventory_pb.NewInventoryServiceClient(conn)
p.inventoryClients.Store(address, client)
return client, nil
}
func (p *GRPCClientPool) getConnection(address string) (*grpc.ClientConn, error) {
if conn, ok := p.connections.Load(address); ok {
return conn.(*grpc.ClientConn), nil
}
conn, err := grpc.Dial(address,
grpc.WithInsecure(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second,
Timeout: 5 * time.Second,
PermitWithoutStream: true,
}),
grpc.WithStatsHandler(&LatencyStatsHandler{}), // 统计延迟
)
if err != nil {
return nil, err
}
p.connections.Store(address, conn)
return conn, nil
}
ETCD 配置和部署:
# etcd-cluster.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: etcd-config
data:
etcd.conf: |
name: etcd-1
data-dir: /var/lib/etcd
listen-client-urls: http://0.0.0.0:2379
advertise-client-urls: http://etcd-1:2379
listen-peer-urls: http://0.0.0.0:2380
initial-advertise-peer-urls: http://etcd-1:2380
initial-cluster: etcd-1=http://etcd-1:2380,etcd-2=http://etcd-2:2380,etcd-3=http://etcd-3:2380
initial-cluster-state: new
auto-compaction-retention: 1
quota-backend-bytes: 8589934592
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: etcd
spec:
serviceName: etcd
replicas: 3
selector:
matchLabels:
app: etcd
template:
metadata:
labels:
app: etcd
spec:
containers:
- name: etcd
image: quay.io/coreos/etcd:v3.5.0
command:
- etcd
- --config-file=/etc/etcd/etcd.conf
ports:
- containerPort: 2379
name: client
- containerPort: 2380
name: peer
volumeMounts:
- name: etcd-config
mountPath: /etc/etcd
- name: etcd-data
mountPath: /var/lib/etcd
volumes:
- name: etcd-config
configMap:
name: etcd-config
volumeClaimTemplates:
- metadata:
name: etcd-data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 10Gi
优势:
- 跨集群发现:支持多 Kubernetes 集群服务发现
- 强一致性:ETCD 提供强一致性保证
- 丰富元数据:可以存储复杂的服务元数据
- 监听机制:实时感知服务变化